1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  /**
19   * Copyright 2014 Netflix, Inc.
20   * 
21   * Licensed under the Apache License, Version 2.0 (the "License");
22   * you may not use this file except in compliance with the License.
23   * You may obtain a copy of the License at
24   * 
25   * http://www.apache.org/licenses/LICENSE-2.0
26   * 
27   * Unless required by applicable law or agreed to in writing, software
28   * distributed under the License is distributed on an "AS IS" BASIS,
29   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30   * See the License for the specific language governing permissions and
31   * limitations under the License.
32   */
33  
34  import static rx.Observable.create;
35  
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.atomic.AtomicLong;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import rx.Notification;
41  import rx.Observable;
42  import rx.Observable.OnSubscribe;
43  import rx.Observable.Operator;
44  import rx.Producer;
45  import rx.Scheduler;
46  import rx.Subscriber;
47  import rx.functions.Action0;
48  import rx.functions.Func1;
49  import rx.functions.Func2;
50  import rx.schedulers.Schedulers;
51  import rx.subjects.PublishSubject;
52  import rx.subscriptions.SerialSubscription;
53  
54  public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
55  
56      static final Func1<Observable<? extends Notification<?>>, Observable<?>> REDO_INIFINITE = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
57          @Override
58          public Observable<?> call(Observable<? extends Notification<?>> ts) {
59              return ts.map(new Func1<Notification<?>, Notification<?>>() {
60                  @Override
61                  public Notification<?> call(Notification<?> terminal) {
62                      return Notification.createOnNext(null);
63                  }
64              });
65          }
66      };
67  
68      public static final class RedoFinite implements Func1<Observable<? extends Notification<?>>, Observable<?>> {
69          private final long count;
70  
71          public RedoFinite(long count) {
72              this.count = count;
73          }
74  
75          @Override
76          public Observable<?> call(Observable<? extends Notification<?>> ts) {
77              return ts.map(new Func1<Notification<?>, Notification<?>>() {
78  
79                  int num=0;
80                  
81                  @Override
82                  public Notification<?> call(Notification<?> terminalNotification) {
83                      if(count == 0) {
84                          return terminalNotification;
85                      }
86                      
87                      num++;
88                      if(num <= count) {
89                          return Notification.createOnNext(num);
90                      } else {
91                          return terminalNotification;
92                      }
93                  }
94                  
95              }).dematerialize();
96          }
97      }
98  
99      public static final class RetryWithPredicate implements Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>> {
100         private Func2<Integer, Throwable, Boolean> predicate;
101 
102         public RetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
103             this.predicate = predicate;
104         }
105 
106         @Override
107         public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> ts) {
108             return ts.scan(Notification.createOnNext(0), new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
109                 @SuppressWarnings("unchecked")
110                 @Override
111                 public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
112                     final int value = n.getValue();
113                     if (predicate.call(value, term.getThrowable()).booleanValue())
114                         return Notification.createOnNext(value + 1);
115                     else
116                         return (Notification<Integer>) term;
117                 }
118             });
119         }
120     }
121 
122     public static <T> Observable<T> retry(Observable<T> source) {
123         return retry(source, REDO_INIFINITE);
124     }
125 
126     public static <T> Observable<T> retry(Observable<T> source, final long count) {
127         if (count < 0)
128             throw new IllegalArgumentException("count >= 0 expected");
129         if (count == 0)
130             return source;
131         return retry(source, new RedoFinite(count));
132     }
133 
134     public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
135         return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
136     }
137 
138     public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
139         return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
140     }
141 
142     public static <T> Observable<T> repeat(Observable<T> source) {
143         return repeat(source, Schedulers.trampoline());
144     }
145 
146     public static <T> Observable<T> repeat(Observable<T> source, Scheduler scheduler) {
147         return repeat(source, REDO_INIFINITE, scheduler);
148     }
149 
150     public static <T> Observable<T> repeat(Observable<T> source, final long count) {
151         return repeat(source, count, Schedulers.trampoline());
152     }
153 
154     public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
155         if(count == 0) {
156             return Observable.empty();
157         }
158         if (count < 0)
159             throw new IllegalArgumentException("count >= 0 expected");
160         return repeat(source, new RedoFinite(count - 1), scheduler);
161     }
162 
163     public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
164         return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
165     }
166 
167     public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
168         return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
169     }
170 
171     public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
172         return create(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
173     }
174 
175     private Observable<T> source;
176     private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> controlHandlerFunction;
177     private boolean stopOnComplete;
178     private boolean stopOnError;
179     private final Scheduler scheduler;
180 
181     private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
182             Scheduler scheduler) {
183         this.source = source;
184         this.controlHandlerFunction = f;
185         this.stopOnComplete = stopOnComplete;
186         this.stopOnError = stopOnError;
187         this.scheduler = scheduler;
188     }
189 
190     @Override
191     public void call(final Subscriber<? super T> child) {
192         final AtomicBoolean isLocked = new AtomicBoolean(true);
193         final AtomicBoolean resumeBoundary = new AtomicBoolean(true);
194         // incremented when requests are made, decremented when requests are fulfilled
195         final AtomicLong consumerCapacity = new AtomicLong(0l);
196         final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
197 
198         final Scheduler.Worker worker = scheduler.createWorker();
199         child.add(worker);
200 
201         final SerialSubscription sourceSubscriptions = new SerialSubscription();
202         child.add(sourceSubscriptions);
203 
204         final PublishSubject<Notification<?>> terminals = PublishSubject.create();
205 
206         final Action0 subscribeToSource = new Action0() {
207             @Override
208             public void call() {
209                 if (child.isUnsubscribed()) {
210                     return;
211                 }
212 
213                 Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
214                     boolean done;
215                     @Override
216                     public void onCompleted() {
217                         if (!done) {
218                             done = true;
219                             currentProducer.set(null);
220                             unsubscribe();
221                             terminals.onNext(Notification.createOnCompleted());
222                         }
223                     }
224 
225                     @Override
226                     public void onError(Throwable e) {
227                         if (!done) {
228                             done = true;
229                             currentProducer.set(null);
230                             unsubscribe();
231                             terminals.onNext(Notification.createOnError(e));
232                         }
233                     }
234 
235                     @Override
236                     public void onNext(T v) {
237                         if (!done) {
238                             if (consumerCapacity.get() != Long.MAX_VALUE) {
239                                 consumerCapacity.decrementAndGet();
240                             }
241                             child.onNext(v);
242                         }
243                     }
244 
245                     @Override
246                     public void setProducer(Producer producer) {
247                         currentProducer.set(producer);
248                         long c = consumerCapacity.get();
249                         if (c > 0) {
250                             producer.request(c);
251                         }
252                     }
253                 };
254                 // new subscription each time so if it unsubscribes itself it does not prevent retries
255                 // by unsubscribing the child subscription
256                 sourceSubscriptions.set(terminalDelegatingSubscriber);
257                 source.unsafeSubscribe(terminalDelegatingSubscriber);
258             }
259         };
260 
261         // the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat' 
262         // type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert 
263         // the retry/repeat relevant values to the control handler
264         final Observable<?> restarts = controlHandlerFunction.call(
265                 terminals.lift(new Operator<Notification<?>, Notification<?>>() {
266                     @Override
267                     public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
268                         return new Subscriber<Notification<?>>(filteredTerminals) {
269                             @Override
270                             public void onCompleted() {
271                                 filteredTerminals.onCompleted();
272                             }
273 
274                             @Override
275                             public void onError(Throwable e) {
276                                 filteredTerminals.onError(e);
277                             }
278 
279                             @Override
280                             public void onNext(Notification<?> t) {
281                                 if (t.isOnCompleted() && stopOnComplete)
282                                     child.onCompleted();
283                                 else if (t.isOnError() && stopOnError)
284                                     child.onError(t.getThrowable());
285                                 else {
286                                     isLocked.set(false);
287                                     filteredTerminals.onNext(t);
288                                 }
289                             }
290 
291                             @Override
292                             public void setProducer(Producer producer) {
293                                 producer.request(Long.MAX_VALUE);
294                             }
295                         };
296                     }
297                 }));
298 
299         // subscribe to the restarts observable to know when to schedule the next redo.
300         worker.schedule(new Action0() {
301             @Override
302             public void call() {
303                 restarts.unsafeSubscribe(new Subscriber<Object>(child) {
304                     @Override
305                     public void onCompleted() {
306                         child.onCompleted();
307                     }
308 
309                     @Override
310                     public void onError(Throwable e) {
311                         child.onError(e);
312                     }
313 
314                     @Override
315                     public void onNext(Object t) {
316                         if (!isLocked.get() && !child.isUnsubscribed()) {
317                             if (consumerCapacity.get() > 0) {
318                                 worker.schedule(subscribeToSource);
319                             } else {
320                                 resumeBoundary.compareAndSet(false, true);
321                             }
322                         }
323                     }
324 
325                     @Override
326                     public void setProducer(Producer producer) {
327                         producer.request(Long.MAX_VALUE);
328                     }
329                 });
330             }
331         });
332 
333         child.setProducer(new Producer() {
334 
335             @Override
336             public void request(final long n) {
337                 long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
338                 Producer producer = currentProducer.get();
339                 if (producer != null) {
340                     producer.request(n);
341                 } else
342                 if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
343                     worker.schedule(subscribeToSource);
344                 }
345             }
346         });
347         
348     }
349 }